package util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.lang.reflect.Field;
import java.util.*;

public class MyHBaseSink extends RichSinkFunction<String> {
    private Connection connection;
    private Class T;
    private String tableName;
    private String[] fieldsName;
    List<Put> list=new ArrayList<Put>();
    
    public static String[] getFiledName(Class T) {
        Field[] fields =T.getDeclaredFields();
        String[] fieldName = new String[fields.length];
        for (int i = 0; i < fieldName.length; i++) {
            fieldName[i] = fields[i].getName();

        }
        return fieldName;
    }

    public MyHBaseSink(Class T, String tableName){
        this.T=T;
        this.tableName=tableName;
        this.fieldsName=getFiledName(T);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        connection= MyHBaseUtil.getConf();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        String[] s1 = value.split("`");
        Table table = connection.getTable(TableName.valueOf(tableName));
//        String rowkey = UUID.randomUUID().toString().replaceAll("-", "");
        Put put = new Put(Bytes.toBytes(s1[0]));
        for (int i = 0; i < fieldsName.length; i++) {
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(fieldsName[i]),Bytes.toBytes(s1[i]));
            list.add(put);
        }
        table.put(list);
    }

    @Override
    public void close() throws Exception {
        connection.close();
    }

}
